{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# ETL scaleout with EMR\n",
    "> when you have money but don't have enough device to process your data, it's time to use EMR"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 🌌 Set AWS Credentials\n",
    "> This notebook assumes that you have already set your AWS credentials in your local machine. If not, please follow the steps below to set your AWS credentials.\n",
    "\n",
    "```bash\n",
    "aws configure\n",
    "    - key: <your access key>\n",
    "    - secret: <your secret key>\n",
    "    - region: <your region>\n",
    "aws configure set aws_session_token <your session token>\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "AWS credentials are valid\n"
     ]
    }
   ],
   "source": [
    "from dataverse.utils.api import aws_check_credentials \n",
    "\n",
    "# check aws credentials\n",
    "# NOTE: `True` means credentials are valid\n",
    "if aws_check_credentials() == True:\n",
    "    print(\"AWS credentials are valid\")\n",
    "else:\n",
    "    raise Exception(\"AWS credentials are invalid\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 🌌 Set up Temporary Data & Environment\n",
    "> Here you don't need to prepare any data. We will create a temporary data and set temporary environment for you."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 Create Temporary Folder at Local & AWS S3"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "import tempfile\n",
    "import uuid\n",
    "\n",
    "from dataverse.utils.api import aws_s3_upload\n",
    "from dataverse.utils.api import aws_s3_create_bucket\n",
    "\n",
    "\n",
    "# create temp local & s3 path\n",
    "tmp_folder = tempfile.TemporaryDirectory()\n",
    "tmp_bucket = uuid.uuid4().hex\n",
    "\n",
    "aws_s3_create_bucket(bucket=tmp_bucket)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 Create Temporary Data and upload to Local & AWS S3\n",
    "> Data will be duplicated"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import pandas as pd\n",
    "from dataverse.utils.api import aws_s3_upload\n",
    "\n",
    "\n",
    "# create sample data and upload to s3\n",
    "sample_path = os.path.join(tmp_folder.name, 'duplicate.json')\n",
    "\n",
    "# create ufl data that has duplication\n",
    "ufl = [\n",
    "    {'text': \"random text\\nduplication\"},\n",
    "    {'text': \"fixed text\\nduplication\"},\n",
    "    {'text': \"fixed text\\nduplication\\nDUPLICATION\"},\n",
    "]\n",
    "df = pd.DataFrame(ufl)\n",
    "df.to_parquet(sample_path)\n",
    "\n",
    "bucket = aws_s3_upload(\n",
    "    bucket=tmp_bucket,\n",
    "    key='duplicate.json',\n",
    "    local_path=sample_path,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 Temporary Dynamic ETL\n",
    "> To show you that you can add temporal dynamic ETL "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Writing dynamic_etl.py\n"
     ]
    }
   ],
   "source": [
    "%%writefile dynamic_etl.py\n",
    "from dataverse.etl import register_etl\n",
    "from pyspark.rdd import RDD\n",
    "from pyspark.sql import DataFrame\n",
    "\n",
    "\n",
    "@register_etl\n",
    "def test___add___one(spark, data, subset='text', *args, **kwargs):\n",
    "    if isinstance(data, DataFrame):\n",
    "        data = data.rdd\n",
    "        data = data.map(lambda row: row.asDict())\n",
    "\n",
    "    def _add_one(row):\n",
    "        row[subset] = row[subset] + '1'\n",
    "        return row\n",
    "\n",
    "    data = data.map(_add_one)\n",
    "\n",
    "    return data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 Create Temporary Config\n",
    "- load parquet from s3\n",
    "- exact deduplicate by line splitted by newline\n",
    "- add `1` text at the end of each data `text`\n",
    "- save as parquet to s3"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Detected Dataverse Bucket: dataverse-dv42-d853ea88-c87d-486f-b3b5-d780203bc262\n",
      "spark:\n",
      "  master: local[10]\n",
      "  appname: default\n",
      "  driver:\n",
      "    memory: 8G\n",
      "    maxResultSize: 2G\n",
      "  executor:\n",
      "    memory: 1G\n",
      "  local:\n",
      "    dir: /root/.cache/dataverse/tmp\n",
      "  ui:\n",
      "    port: 4040\n",
      "etl:\n",
      "- name: data_ingestion___parquet___pq2ufl\n",
      "  args:\n",
      "    path: s3a://581f4bedcaf24703b248e73d4ecefabd/duplicate.json\n",
      "    repartition: 1\n",
      "- name: deduplication___common_crawl___exact_line\n",
      "- name: test___add___one\n",
      "- name: data_load___parquet___ufl2parquet\n",
      "  args:\n",
      "    save_path: s3a://581f4bedcaf24703b248e73d4ecefabd/deduplicate.parquet\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from dataverse.config import Config\n",
    "from omegaconf import OmegaConf\n",
    "\n",
    "load_path = f\"s3a://{tmp_bucket}/duplicate.json\"\n",
    "save_path = f\"s3a://{tmp_bucket}/deduplicate.parquet\"\n",
    "\n",
    "config = Config.default()\n",
    "config.etl.append({\n",
    "    'name': 'data_ingestion___parquet___pq2ufl',\n",
    "    'args': {\n",
    "        'path': load_path,\n",
    "        'repartition': 1\n",
    "    }}\n",
    ")\n",
    "config.etl.append({'name': 'deduplication___common_crawl___exact_line'})\n",
    "config.etl.append({'name': 'test___add___one'})\n",
    "config.etl.append({\n",
    "    'name': 'data_load___parquet___ufl2parquet',\n",
    "    'args': {'save_path': save_path}})\n",
    "\n",
    "print(OmegaConf.to_yaml(config))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 🌌 ETLPipeline with `Local`\n",
    "> We will test our ETL pipeline with local machine first"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 Import `dynamic_etl.py` to add custom ETL"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Detected Dataverse Bucket: dataverse-dv42-d853ea88-c87d-486f-b3b5-d780203bc262\n"
     ]
    }
   ],
   "source": [
    "# you can import before running the etl\n",
    "import dynamic_etl"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 run ETL Pipeline with Local machine\n",
    "> as the config specified\n",
    "\n",
    "- we will load data from s3\n",
    "- exact deduplicate by line splitted by newline\n",
    "- add `1` text at the end of each data `text`\n",
    "- and save as parquet to s3"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "spark conf is set with [ temporary ] S3 credentials\n",
      ":: loading settings :: url = jar:file:/data/project/private/ducky/anaconda3/envs/llm/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Ivy Default Cache set to: /root/.ivy2/cache\n",
      "The jars for the packages stored in: /root/.ivy2/jars\n",
      "org.apache.hadoop#hadoop-aws added as a dependency\n",
      "com.amazonaws#aws-java-sdk-bundle added as a dependency\n",
      ":: resolving dependencies :: org.apache.spark#spark-submit-parent-bbfd8d7f-e9d3-48d9-b3a0-6ac07189c03d;1.0\n",
      "\tconfs: [default]\n",
      "\tfound org.apache.hadoop#hadoop-aws;3.3.4 in central\n",
      "\tfound org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central\n",
      "\tfound com.amazonaws#aws-java-sdk-bundle;1.12.592 in central\n",
      ":: resolution report :: resolve 128ms :: artifacts dl 4ms\n",
      "\t:: modules in use:\n",
      "\tcom.amazonaws#aws-java-sdk-bundle;1.12.592 from central in [default]\n",
      "\torg.apache.hadoop#hadoop-aws;3.3.4 from central in [default]\n",
      "\torg.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]\n",
      "\t:: evicted modules:\n",
      "\tcom.amazonaws#aws-java-sdk-bundle;1.12.262 by [com.amazonaws#aws-java-sdk-bundle;1.12.592] in [default]\n",
      "\t---------------------------------------------------------------------\n",
      "\t|                  |            modules            ||   artifacts   |\n",
      "\t|       conf       | number| search|dwnlded|evicted|| number|dwnlded|\n",
      "\t---------------------------------------------------------------------\n",
      "\t|      default     |   4   |   0   |   0   |   1   ||   3   |   0   |\n",
      "\t---------------------------------------------------------------------\n",
      ":: retrieving :: org.apache.spark#spark-submit-parent-bbfd8d7f-e9d3-48d9-b3a0-6ac07189c03d\n",
      "\tconfs: [default]\n",
      "\t0 artifacts copied, 3 already retrieved (0kB/4ms)\n",
      "23/12/14 21:39:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
      "23/12/14 21:39:58 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).\n",
      "23/12/14 21:40:01 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties\n",
      "23/12/14 21:40:06 WARN BlockManager: Block rdd_20_0 already exists on this machine; not re-adding it\n",
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "from dataverse.etl import ETLPipeline\n",
    "\n",
    "etl_pipeline = ETLPipeline()\n",
    "spark, data = etl_pipeline.run(config=config)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 download data from s3 and check the result"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "s3a://581f4bedcaf24703b248e73d4ecefabd/deduplicate.parquet\n"
     ]
    }
   ],
   "source": [
    "# aws s3 path\n",
    "print(save_path)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>text</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>random text\\nduplication1</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>fixed text1</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                        text\n",
       "0  random text\\nduplication1\n",
       "1                fixed text1"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from dataverse.utils.api import aws_s3_path_parse\n",
    "from dataverse.utils.api import aws_s3_download\n",
    "\n",
    "\n",
    "bucket, key = aws_s3_path_parse(save_path)\n",
    "aws_s3_download(\n",
    "    bucket=bucket,\n",
    "    key=key,\n",
    "    local_path=os.path.join(tmp_folder.name, 'deduplicate.parquet'),\n",
    ")\n",
    "pd.read_parquet(os.path.join(tmp_folder.name, 'deduplicate.parquet'))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 Remove Result at local & AWS S3"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "import shutil\n",
    "from dataverse.utils.api import aws_s3_delete\n",
    "\n",
    "# remove saved deduplicate.parquet\n",
    "shutil.rmtree(os.path.join(tmp_folder.name, 'deduplicate.parquet'))\n",
    "aws_s3_delete(bucket=tmp_bucket, key='deduplicate.parquet')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 🌌 ETLPipeline with `EMR`\n",
    "> Works good? Let's scale out with EMR!\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 run ETL Pipeline with EMR Machine\n",
    "> add `emr=True` to ETL pipeline. that's all! Auto handle EMR cluster for you!\n",
    "\n",
    "\n",
    "- set `verbose=True` to see the log of EMR cluster\n",
    "- return value `data` will be returned as config set by `Dataverse` EMR Manager\n",
    "\n",
    "```python\n",
    "# before - local\n",
    "spark, data = etl_pipeline(config)\n",
    "\n",
    "# after - EMR\n",
    "spark, config = etl_pipeline(config, emr=True)\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "================================================================================\n",
      "Default instance type is [ c5.xlarge ]\n",
      "================================================================================\n",
      " vCPU: 4\n",
      " Memory: 8192\n",
      " Price: 0.088100\n",
      "================================================================================\n",
      "\n",
      "[ Dataverse ] step status: COMPLETED. Done.\n",
      "DependencyViolation occured when terminating EMR cluster. Retrying one more time\n"
     ]
    }
   ],
   "source": [
    "from dataverse.etl import ETLPipeline\n",
    "\n",
    "etl_pipeline = ETLPipeline()\n",
    "spark, config = etl_pipeline.run(config=config, emr=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from omegaconf import OmegaConf\n",
    "\n",
    "print(OmegaConf.to_yaml(config))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 download data from s3 and check the result"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>text</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>random text\\nduplication1</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>fixed text1</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                        text\n",
       "0  random text\\nduplication1\n",
       "1                fixed text1"
      ]
     },
     "execution_count": 15,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from dataverse.utils.api import aws_s3_path_parse\n",
    "from dataverse.utils.api import aws_s3_download\n",
    "\n",
    "\n",
    "bucket, key = aws_s3_path_parse(save_path)\n",
    "aws_s3_download(\n",
    "    bucket=bucket,\n",
    "    key=key,\n",
    "    local_path=os.path.join(tmp_folder.name, 'deduplicate.parquet'),\n",
    ")\n",
    "pd.read_parquet(os.path.join(tmp_folder.name, 'deduplicate.parquet'))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 Remove Result at local & AWS S3"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "import shutil\n",
    "from dataverse.utils.api import aws_s3_delete\n",
    "\n",
    "# remove saved deduplicate.parquet\n",
    "shutil.rmtree(os.path.join(tmp_folder.name, 'deduplicate.parquet'))\n",
    "aws_s3_delete(bucket=tmp_bucket, key='deduplicate.parquet')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 🌌 Set `EMR` custom config\n",
    "> Wanna customize your EMR cluster? Let's do it!\n",
    "\n",
    "```python\n",
    "from dataverse.config import Config\n",
    "\n",
    "# if you have your own EMR cluster, you can set your own EMR cluster config\n",
    "config = Config.default(emr=True)\n",
    "config.emr.id = 'j-XXXXXXXXXXXXX'(your emr cluster id)\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "spark:\n",
      "  master: local[10]\n",
      "  appname: default\n",
      "  driver:\n",
      "    memory: 8G\n",
      "    maxResultSize: 2G\n",
      "  executor:\n",
      "    memory: 1G\n",
      "  local:\n",
      "    dir: /root/.cache/dataverse/tmp\n",
      "  ui:\n",
      "    port: 4040\n",
      "etl:\n",
      "- name: data_ingestion___parquet___pq2ufl\n",
      "  args:\n",
      "    path: s3a://576768809f8a4181b034ef7921613d41/duplicate.json\n",
      "    repartition: 1\n",
      "- name: deduplication___common_crawl___exact_line\n",
      "- name: test___add___one\n",
      "- name: data_load___parquet___ufl2parquet\n",
      "  args:\n",
      "    save_path: s3a://576768809f8a4181b034ef7921613d41/deduplicate.parquet\n",
      "emr:\n",
      "  id: null\n",
      "  working_dir: null\n",
      "  name: dataverse_emr\n",
      "  release: emr-6.15.0\n",
      "  idle_timeout: 3600\n",
      "  master_instance:\n",
      "    type: null\n",
      "  core_instance:\n",
      "    type: null\n",
      "    count: 5\n",
      "  task_instance:\n",
      "    type: null\n",
      "    count: 0\n",
      "  auto_generated: null\n",
      "  role:\n",
      "    ec2:\n",
      "      name: null\n",
      "      policy_arns: null\n",
      "    emr:\n",
      "      name: null\n",
      "      policy_arns: null\n",
      "  instance_profile:\n",
      "    name: null\n",
      "    ec2_role: null\n",
      "  vpc:\n",
      "    id: null\n",
      "  subnet:\n",
      "    id: null\n",
      "    public_id: null\n",
      "    private_id: null\n",
      "    public: true\n",
      "  security_group:\n",
      "    id: null\n",
      "  gateway:\n",
      "    id: null\n",
      "  route_table:\n",
      "    id: null\n",
      "  elastic_ip:\n",
      "    id: null\n",
      "  nat_gateway:\n",
      "    id: null\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from dataverse.config import Config\n",
    "from omegaconf import OmegaConf\n",
    "\n",
    "load_path = f\"s3a://{tmp_bucket}/duplicate.json\"\n",
    "save_path = f\"s3a://{tmp_bucket}/deduplicate.parquet\"\n",
    "\n",
    "# TODO: add `emr=True` to get the emr config\n",
    "# =========================================\n",
    "config = Config.default(emr=True)\n",
    "# =========================================\n",
    "\n",
    "config.etl.append({\n",
    "    'name': 'data_ingestion___parquet___pq2ufl',\n",
    "    'args': {\n",
    "        'path': load_path,\n",
    "        'repartition': 1\n",
    "    }}\n",
    ")\n",
    "config.etl.append({'name': 'deduplication___common_crawl___exact_line'})\n",
    "config.etl.append({'name': 'test___add___one'})\n",
    "config.etl.append({\n",
    "    'name': 'data_load___parquet___ufl2parquet',\n",
    "    'args': {'save_path': save_path}})\n",
    "\n",
    "# TODO: add `emr=True` to get the emr config\n",
    "# =========================================\n",
    "config.emr.core_instance.count = 5\n",
    "\n",
    "# TODO: there are more config options for emr\n",
    "#       check `dataverse.config.Config.default`\n",
    "# =========================================\n",
    "\n",
    "print(OmegaConf.to_yaml(config))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dataverse.etl import ETLPipeline\n",
    "\n",
    "etl_pipeline = ETLPipeline()\n",
    "spark, config = etl_pipeline.run(config=config, emr=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 download data from s3 and check the result"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>text</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>random text\\nduplication1</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>fixed text1</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                        text\n",
       "0  random text\\nduplication1\n",
       "1                fixed text1"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from dataverse.utils.api import aws_s3_path_parse\n",
    "from dataverse.utils.api import aws_s3_download\n",
    "\n",
    "\n",
    "bucket, key = aws_s3_path_parse(save_path)\n",
    "aws_s3_download(\n",
    "    bucket=bucket,\n",
    "    key=key,\n",
    "    local_path=os.path.join(tmp_folder.name, 'deduplicate.parquet'),\n",
    ")\n",
    "pd.read_parquet(os.path.join(tmp_folder.name, 'deduplicate.parquet'))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 🌌 Remove Temporary Data & Environment\n",
    "> it's time to clean up"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dataverse.utils.api import aws_s3_delete\n",
    "from dataverse.utils.api import aws_s3_delete_bucket\n",
    "\n",
    "!rm dynamic_etl.py\n",
    "\n",
    "# remove temp folder\n",
    "tmp_folder.cleanup()\n",
    "\n",
    "# remove temp bucket\n",
    "aws_s3_delete(bucket=tmp_bucket, key='')\n",
    "aws_s3_delete_bucket(bucket=tmp_bucket)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "llm",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.11"
  },
  "orig_nbformat": 4
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
